# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABCMeta, abstractmethod
import copy
import numpy as np
from hysop.tools.htypes import check_instance, first_not_None
from hysop.tools.units import bytes2str
from hysop.tools.numerics import get_dtype
from hysop.tools.misc import prod
from hysop.core.arrays.array_backend import ArrayBackend
from hysop.constants import HYSOP_BOOL, Backend
[docs]
class MemoryRequest:
"""Memory request that can be requested in get_work_properties()"""
__slots__ = (
"backend",
"alignment",
"dtype",
"size",
"shape",
"nb_components",
"id",
)
def __init__(
self,
backend,
size=None,
shape=None,
dtype=None,
alignment=None,
nb_components=1,
):
"""
Creates a memory request to be served from given backend.
When dtype is not specified, size represents the number of bytes
else the number of elements of given data type.
size can be specified directly by a n-dimensional shape parameter.
In this case, depending on the chosen backend, the output buffer
may be altered to accomodate this shape (like numpy n-dimensional arrays).
Every np.bool request is converted to HYSOP_BOOL dtype (ie. some integer type).
"""
if dtype == np.bool_:
dtype = HYSOP_BOOL
check_instance(backend, ArrayBackend)
check_instance(size, (int, np.integer), allow_none=True)
check_instance(alignment, (int, np.integer), allow_none=True)
check_instance(nb_components, (int, np.integer), allow_none=True)
check_instance(shape, (tuple, list, np.ndarray), allow_none=True)
if dtype is None:
dtype = np.dtype(np.uint8)
assert dtype.itemsize == 1
else:
dtype = get_dtype(dtype)
assert dtype is not None
dtype = np.dtype(dtype)
if (size is not None) and (shape is not None):
pass
if size is not None:
if size < 1:
raise ValueError("size < 1.")
shape = (size,)
elif shape is not None:
size = 1
for s in shape:
if s < 1:
raise ValueError(f"shape {shape} < 1")
size *= s
else:
raise ValueError("size and shape have not been specified.")
dtype_alignment = self.min_dtype_alignment(dtype)
hardware_alignment = self.min_hardware_alignment(backend)
alignment = first_not_None(alignment, hardware_alignment)
min_alignment = min(hardware_alignment, dtype_alignment, alignment)
max_alignment = max(hardware_alignment, dtype_alignment, alignment)
if max_alignment % min_alignment != 0:
msg = "Incompatible aligmnents, specified an alignment of {} "
msg += "but given dtype should be aligned on {} bytes."
msg = msg.format(alignment, dtype_alignment)
raise ValueError(msg)
alignment = max_alignment
if prod(shape) != size:
msg = "Shape does not match size (size={}, prod(shape)={})."
msg.format(size, prod(shape))
raise ValueError(msg)
if alignment <= 0:
msg = f"Alignment should be positive (got {alignment})."
raise ValueError(alignment)
if alignment & (alignment - 1) != 0:
msg = f"Alignment is not a power of two (got {alignment})."
raise ValueError(alignment)
self.backend = backend
self.alignment = alignment
self.dtype = dtype
self.size = size
self.shape = tuple(shape)
self.nb_components = nb_components
max_bytes = self.max_bytes()
max_alloc_size = self.max_alloc_size(backend)
if max_bytes > max_alloc_size:
msg = "Memory request size {} ({}B) exceeds maximal allocatable size {} ({}B) "
msg += "for backend {}."
msg = msg.format(
bytes2str(max_bytes),
max_bytes,
bytes2str(max_alloc_size),
max_alloc_size,
backend.full_tag,
)
raise RuntimeError(msg)
[docs]
def data_bytes_per_component(self):
"""
Bytes to be allocated per components.
"""
return self.size * self.bytes_per_element(self.dtype)
[docs]
def min_bytes(self):
"""
Minimum number bytes that may be allocated for all components.
"""
return self.nb_components * (self.data_bytes_per_component())
[docs]
def max_bytes(self):
"""
Real number bytes that will be allocated for all components.
"""
return self.nb_components * (
self.data_bytes_per_component() + self.alignment - 1
)
[docs]
def max_bytes_per_component(self):
"""
Real number bytes that will be allocated for all components.
"""
return self.data_bytes_per_component() + self.alignment - 1
[docs]
def min_dtype_alignment(self, dtype):
"""
Returns the minimum alignement of the allocated buffer (in bytes).
"""
return self.bytes_per_element(dtype)
[docs]
def min_hardware_alignment(self, backend):
"""
Returns the minimum alignement to be hardware aligned (in bytes).
"""
if backend.kind == Backend.OPENCL:
return backend.cl_env.device.mem_base_addr_align
else:
return 8 # 64 bits by default
[docs]
def bytes_per_element(self, dtype):
"""
Returns the size in bytes of the allocated data type.
"""
return dtype.itemsize
[docs]
def max_alloc_size(self, backend):
"""
Returns the maximal alloc size supported by backend.
"""
return backend.max_alloc_size
[docs]
@classmethod
def cartesian_dfield_like(
cls,
name,
dfield,
nb_components=None,
initial_values=None,
dtype=None,
grid_resolution=None,
ghosts=None,
backend=None,
is_read_only=None,
):
from hysop.fields.cartesian_discrete_field import (
CartesianDiscreteScalarFieldView,
)
check_instance(dfield, CartesianDiscreteScalarFieldView)
ghosts = first_not_None(ghosts, dfield.ghosts)
if np.isscalar(ghosts):
ghosts = (ghosts,) * dfield.dim
elif isinstance(ghosts, bool):
ghosts = dfield.ghosts if (ghosts is True) else (0,) * dfield.dim
assert len(ghosts) == dfield.dim
ghosts = np.asarray(ghosts)
(dfield, request, request_id) = dfield.tmp_dfield_like(
name=name,
backend=backend,
nb_components=nb_components,
initial_values=initial_values,
dtype=dtype,
grid_resolution=grid_resolution,
ghosts=ghosts,
is_read_only=is_read_only,
)
return (dfield, request, request_id)
[docs]
def stuple(self):
if not hasattr(self, "id"):
id = "None"
else:
id = self.id
size = bytes2str(self.min_bytes(), decimal=False)
ret = (id, size, self.nb_components, self.shape, self.dtype, self.alignment)
return tuple(map(str, ret))
def __str__(self):
if not hasattr(self, "id"):
id = "None"
else:
id = self.id
msg = "request of size {:<9} (ncomp={}, shape={:<12}, "
msg += "dtype={:<8}, align={:<2}, id={})"
msg = msg.format(
bytes2str(self.min_bytes(), decimal=False),
self.nb_components,
self.shape,
self.dtype,
self.alignment,
id,
)
return msg
[docs]
@classmethod
def empty_like(
cls,
a,
backend=None,
alignment=None,
dtype=None,
size=None,
shape=None,
nb_components=None,
):
if hasattr(a, "backend"):
backend = first_not_None(backend, a.backend)
if hasattr(a, "alignment"):
alignment = first_not_None(alignment, a.alignment)
if hasattr(a, "dtype"):
dtype = first_not_None(dtype, a.dtype)
if size is None:
if hasattr(a, "resolution"):
shape = first_not_None(shape, a.resolution)
elif hasattr(a, "shape"):
shape = first_not_None(shape, a.shape)
if shape is None:
if hasattr(a, "npoints"):
size = first_not_None(size, a.npoints)
elif hasattr(a, "size"):
size = first_not_None(size, a.size)
if hasattr(a, "nb_components"):
nb_components = first_not_None(nb_components, a.nb_components)
return MemoryRequest(
backend=backend,
alignment=alignment,
dtype=dtype,
size=size,
shape=shape,
nb_components=nb_components,
)
[docs]
def __call__(self, op=None, request_identifier=None):
"""
Promote this MemoryRequest object to a MultipleOperatorMemoryRequests object.
"""
reqs = OperatorMemoryRequests(op)
reqs.push_mem_request(request_identifier=request_identifier, mem_request=self)
return reqs()
[docs]
class OperatorMemoryRequests:
"""
Set of memory requests originating from one operator, sorted by backend.
"""
def __init__(self, operator):
self._operator = operator
self._requests_per_backend = {}
self._requests_per_identifier = {}
[docs]
def push_mem_request(self, request_identifier, mem_request):
if request_identifier in self._requests_per_identifier:
raise ValueError(f"id {request_identifier} was already requested.")
if not isinstance(mem_request, MemoryRequest):
cls = mem_request.__class__.__name__
raise ValueError(f"Input is not a MemoryRequest (got a {cls}).")
backend = mem_request.backend
mem_request.id = request_identifier
if backend not in self._requests_per_backend:
self._requests_per_backend[backend] = []
self._requests_per_backend[backend].append(mem_request)
self._requests_per_identifier[request_identifier] = mem_request
mem_request.id = request_identifier
[docs]
def min_bytes_to_allocate(self, backend):
return sum(req.max_bytes() for req in self._requests_per_backend[backend])
[docs]
def __call__(self):
"""
Promote this OperatorMemoryRequests object to a MultipleOperatorMemoryRequests
object.
"""
reqs = MultipleOperatorMemoryRequests()
reqs.push_mem_requests(self)
return reqs
[docs]
class MultipleOperatorMemoryRequests:
"""
Set of memory requests originating from one or more operators.
"""
def __init__(self):
self._allocated_buffers = {}
self._all_requests_per_backend = {}
self._allocated = False
[docs]
def push_mem_requests(self, *requests):
for mem_requests in requests:
if isinstance(mem_requests, MultipleOperatorMemoryRequests):
for (
backend,
op_requests,
) in mem_requests._all_requests_per_backend.items():
if backend not in self._all_requests_per_backend.keys():
self._all_requests_per_backend[backend] = {}
for op, op_reqs in op_requests.items():
if op in self._all_requests_per_backend[backend].keys():
msg = f"Operator {op} has already requested memory."
raise ValueError(msg)
self._all_requests_per_backend[backend][op] = op_reqs
elif isinstance(mem_requests, OperatorMemoryRequests):
operator = mem_requests._operator
for backend, requests in mem_requests._requests_per_backend.items():
if backend not in self._all_requests_per_backend.keys():
self._all_requests_per_backend[backend] = {}
if operator in self._all_requests_per_backend[backend].keys():
msg = f"Operator {operator} has already requested memory."
raise ValueError(msg)
self._all_requests_per_backend[backend][operator] = requests
else:
cls = mem_requests.__class__
msg = f"Input is not an OperatorMemoryRequests (got a {cls})."
raise ValueError(msg)
return self
[docs]
def operators(self):
ops = []
for requests in self._all_requests_per_backend.values():
ops += list(requests.keys())
return ops
def __iadd__(self, other):
if other is None:
return self
return self.push_mem_requests(other)
[docs]
def min_bytes_to_allocate(self, backend):
max_bytes = 0
for mem_requests in self._all_requests_per_backend[backend].values():
req_bytes = sum(req.max_bytes() for req in mem_requests)
max_bytes = max(req_bytes, max_bytes)
return max_bytes
[docs]
def allocate(self, allow_subbuffers):
"""
Handle memory request issued by operators.
"""
assert not self._allocated, "Memory requests were already allocated."
for backend in self._all_requests_per_backend.keys():
self._allocate_on_backend(backend, allow_subbuffers=allow_subbuffers)
self._allocated = True
return self
def _allocate_on_backend(self, backend, allow_subbuffers):
views = self._allocated_buffers
op_requests = self._all_requests_per_backend[backend]
check_instance(views, dict)
check_instance(op_requests, dict)
total_bytes = self.min_bytes_to_allocate(backend)
if total_bytes == 0:
return
if allow_subbuffers:
data = backend.empty(shape=(total_bytes,), dtype=np.uint8)
for op, requests in op_requests.items():
check_instance(requests, list, values=MemoryRequest)
start_idx, end_idx = 0, 0
for req in requests:
req_views = []
size = req.data_bytes_per_component()
for i in range(req.nb_components):
# align on offset and not on pointer anymore (see issue #1)
align_offset = -start_idx % req.alignment
start_idx += align_offset
end_idx = start_idx + size
view = (
data[start_idx:end_idx]
.view(dtype=req.dtype)
.reshape(req.shape)
)
req_views.append(view)
if view.base is not data.base:
msg = "FATAL ERROR: Could not create views on data because base "
msg += "differs on backend {}."
msg = msg.format(backend.kind)
raise RuntimeError(msg)
if view.int_ptr != data.int_ptr + start_idx:
msg = "FATAL ERROR: Point arithmetic is wrong."
msg += " Expected ptr: {}"
msg += " Actual ptr: {}"
msg = msg.format(data.int_ptr + start_idx, view.int_ptr)
raise RuntimeError(msg)
if ((view.int_ptr - data.int_ptr) % req.alignment) != 0:
msg = "FATAL ERROR: Could not provide requested offset alignment."
msg = msg.format(req.alignment)
raise RuntimeError(msg)
start_idx = end_idx
if op not in views:
views[op] = {}
if req.nb_components >= 1:
views[op][req.id] = tuple(req_views)
assert end_idx <= total_bytes
else:
buffer_sizes = []
ordered_requests = {}
for op, requests in op_requests.items():
assert op not in ordered_requests
check_instance(requests, list, values=MemoryRequest)
op_buffer_sizes = ()
op_reqs = ()
for req in requests:
nbytes = req.max_bytes_per_component()
for i in range(req.nb_components):
op_buffer_sizes += (nbytes,)
op_reqs += (req,)
idx = np.argsort(op_buffer_sizes, kind="mergesort")[::-1]
op_buffer_sizes = tuple(op_buffer_sizes[i] for i in idx)
op_sorted_reqs = tuple(op_reqs[i] for i in idx)
for i, size in enumerate(op_buffer_sizes):
if i >= len(buffer_sizes):
buffer_sizes.append(size)
else:
buffer_sizes[i] = max(buffer_sizes[i], size)
ordered_requests[op] = op_sorted_reqs
nbuffers = len(buffer_sizes)
if nbuffers == 0:
return
buffers = tuple(
backend.empty(shape=(nbytes,), dtype=np.uint8)
for nbytes in buffer_sizes
)
for op, requests in ordered_requests.items():
assert len(buffers) >= len(requests)
views.setdefault(op, {})
old_req = None
for buf, req in zip(buffers, requests):
if req != old_req:
if old_req is not None:
assert old_req.id not in views[op]
views[op][old_req.id] = tuple(req_views)
req_views = []
nbytes = req.data_bytes_per_component()
if backend.kind is Backend.HOST:
alignment = req.alignment
assert buf.size >= nbytes + alignment - 1
ptr, read_only = buf.__array_interface__["data"]
align_offset = -ptr % alignment
else:
# no way to enforce more than device alignment for other backends (opencl)
# because for OpenCL 1.x a device pointer address can change between kernel
# calls.
align_offset = 0
view = (
buf[align_offset : align_offset + nbytes]
.view(dtype=req.dtype)
.reshape(req.shape)
)
req_views.append(view)
old_req = req
assert old_req.id not in views[op]
views[op][old_req.id] = tuple(req_views)
[docs]
def get_buffer(self, operator, request_identifier, handle=False):
if not self._allocated:
msg = "Memory request have not been allocated yet."
raise RuntimeError(msg)
if operator not in self._allocated_buffers:
msg = "Operator {} did not request any extra memory. \nOperators that requested memory are:\n *{}"
msg = msg.format(
operator, "\n *".join(str(op) for op in self._allocated_buffers.keys())
)
raise RuntimeError(msg)
op_buffers = self._allocated_buffers[operator]
if request_identifier not in op_buffers:
msg = "Unknown request id {} for operator {}."
msg += "\nValid identifiers are: " + ",".join(
str(op) for op in op_buffers.keys()
)
msg = msg.format(request_identifier, operator)
raise ValueError(msg)
buffers = op_buffers[request_identifier]
if handle:
from hysop.backend.host.host_buffer import HostBuffer
if isinstance(buffers[0].handle, HostBuffer):
buffers = tuple(b.handle.view(np.ndarray) for b in buffers)
else:
buffers = tuple(b.handle for b in buffers)
return buffers
[docs]
def sreport(self):
all_requests = {}
totals = {}
for backend, backend_requests in self._all_requests_per_backend.items():
total = 0
for op in sorted(
backend_requests.keys(), key=lambda op: getattr(op, "name", None)
):
op_requests = backend_requests[op]
sop_request = all_requests.setdefault(backend, {}).setdefault(op, [])
local_total = 0
try:
opname = f"{op.pretty_name}"
except AttributeError:
opname = None
for req in op_requests:
sop_request.append((opname,) + req.stuple())
local_total += req.max_bytes()
if local_total > total:
total = local_total
totals[backend] = total
if len(all_requests):
sizes = {}
template = "\n"
titles = (
"OPERATOR",
"REQUEST_ID",
"SIZE",
"COMPONENTS",
"SHAPE",
"DTYPE",
"ALIGNMENT",
)
for i, k in enumerate(titles):
k = k.lower()
template += " "
size = max(
len(req[i])
for breqs in all_requests.values()
for reqs in breqs.values()
for req in reqs
)
size = max(size, len(k))
name = k + "_len"
sizes[name] = size
template += "{:" + ("<" if i == 0 else "^") + "{" + name + "}}"
ss = ""
for backend, backend_srequests in all_requests.items():
total = totals[backend]
kind = backend.kind
if kind == Backend.OPENCL:
precision = f" on device {backend.device.name.strip()}"
else:
precision = ""
ss += f"\n {backend.full_tag}{precision}:"
ss += template.format(*titles, **sizes)
for op in sorted(
backend_srequests.keys(), key=lambda op: getattr(op, "name", None)
):
sop_reqs = backend_srequests[op]
for sreq in sop_reqs:
ss += template.format(*sreq, **sizes)
ss += "\n Total extra work buffers requested: {} ({})".format(
bytes2str(total, decimal=False), bytes2str(total, decimal=True)
)
ss += "\n"
return ss[1:-1]
else:
return " No extra buffers have been requested."